package rmq

import (
	"encoding/json"
	"gitee.com/masaichi/mlib/rmq/rmqconstant"
	"gitee.com/masaichi/mlib/rmq/rmqlib"
	"gitee.com/masaichi/mlib/rmq/rmqstruct"
	"sync"
)

var once sync.Once
var onceDelay sync.Once
var onceFanout sync.Once

func IntoQueue(param rmqstruct.QueueParam) {
	//1. 初始化连接
	once.Do(func() {
		//初始化交换机和队列绑定
		rmqlib.CommonInit()
	})
	mq := rmqlib.NewMQ()
	//开启confirm模式
	mq.SetConfirm()
	//循环切片
	for _, item := range param.Data {
		bstr, _ := json.Marshal(rmqstruct.NewQueueParamItem(param.Service, item, param.Callback))
		err := mq.SendMessage(rmqconstant.COMMON_ROUTING_KEY, rmqconstant.COMMON_EXCHANGE, string(bstr))
		if err != nil {
			panic(err)
		}
	}
	//开一个协程处理confirm
	go mq.ListenConfirm()
}

//进入延迟队列
func IntoDelayQueue(param rmqstruct.QueueParam, delay int) {
	//1. 初始化连接
	onceDelay.Do(func() {
		//初始化交换机和队列绑定
		rmqlib.CommonDelayInit()
	})
	mq := rmqlib.NewMQ()
	//开启confirm模式
	mq.SetConfirm()
	//循环切片
	for _, item := range param.Data {
		bstr, _ := json.Marshal(rmqstruct.NewQueueParamItem(param.Service, item, param.Callback))
		err := mq.SendDelayMessage(rmqconstant.COMMON_DELAY_ROUTING_KEY, rmqconstant.COMMON_DELAY_EXCHANGE, string(bstr), delay)
		if err != nil {
			panic(err)
		}
	}
	go mq.ListenConfirm()
}

//进入广播交换机，默认交换机
func IntoCommonFanoutQueue(param rmqstruct.QueueBroadcastParam) {
	//一台交换机，广播不同的消息，需要针对这些消息进行判断处理，这里不做绑定处理
	//1. 初始化连接
	onceFanout.Do(func() {
		//初始化交换机和队列绑定
		rmqlib.CommonFanoutInit()
	})
	mq := rmqlib.NewMQ()
	//开启confirm模式
	mq.SetConfirm()
	//循环切片
	for _, item := range param.Data {
		bstr, _ := json.Marshal(rmqstruct.NewQueueBroadcastParamItem(param.Event, item, param.Callback))
		err := mq.SendFanoutMessage(rmqconstant.COMMON_FANOUT_EXCHANGE, string(bstr))
		if err != nil {
			panic(err)
		}
	}
	go mq.ListenConfirm()
}

//自定义广播交换机
func IntoFanoutQueue(param rmqstruct.QueueBroadcastParam, exchange string) {
	//一台交换机，广播不同的消息，需要针对这些消息进行判断处理，这里不做绑定处理
	//1. 初始化连接
	onceFanout.Do(func() {
		//初始化交换机和队列绑定
		rmqlib.CommonFanoutInit()
	})
	mq := rmqlib.NewMQ()
	//开启confirm模式
	mq.SetConfirm()
	//循环切片
	for _, item := range param.Data {
		bstr, _ := json.Marshal(rmqstruct.NewQueueBroadcastParamItem(param.Event, item, param.Callback))
		err := mq.SendFanoutMessage(exchange, string(bstr))
		if err != nil {
			panic(err)
		}
	}
	go mq.ListenConfirm()
}
